package com.lock.zklock;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.zk.ZkClientHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryLoop;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.PredicateResults;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Zookeeer分布式互斥锁
 * 参考链接：
 * https://blog.csdn.net/crazymakercircle/article/details/85922561
 *
 * @author 007
 */
public class ZkDistributedMutex implements InterProcessLock {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final CuratorFramework client;
    private final String basePath;
    private final String lockName;
    private final String lockPath;
    private String selfPath;
    // private String waitPath;
    private String myID;
    public static String UNDER_LINE = ZkClientHelper.UNDER_LINE;

    private final ConcurrentMap<Thread, InnerLockData> threadData = Maps.newConcurrentMap();

    /**
     * 定义锁资源
     */
    private static class InnerLockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private InnerLockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

    /**
     * 定义一个监听器Watcher
     *
     * Watcher监听器是一次性的，如果要反复使用，就需要反复的使用usingWatcher提前注册。所以，Watcher监听器不能应用于节点的数据变
     * 动或者节点变动这样的一般业务场景。而是适用于一些特殊的，比如会话超时、授权失败等这样的特殊场景。
     * 由于在开发过程中需要反复注册Watcher，比较繁琐，Curator引入了Cache来监听ZooKeeper服务端的事件(这里没有用到)。
     * Cache对ZooKeeper事件监听进行了封装，能够自动处理反复注册监听。
     */
    private final Watcher watcher = new Watcher() {
        /**
         * 事件回调方法
         * @param event 需要注意的是，WatchedEvent是curator封装过的事件，它没有实现Serializable接口。
         *              不要与zookeeper的WatcherEvent搞混了
         */
        @Override
        public void process(WatchedEvent event) {
            /**
             * 一、当KeeperState是SyncConnected时：
             * 1、EventType是None(-1)表示：客户端与服务端成功建立连接
             * 2、EventType是NodeCreated(1)表示：Watcher监听的对应数据节点被创建
             * 3、EventType是NodeDeleted(2)表示：Watcher监听的对应数据节点被删除
             * 4、EventType是NodeDataChanged (3)表示：Watcher监听的对应数据节点的数据内容发生变更
             * 5、EventType是NodeDataChanged (4)表示：Wather监听的对应数据节点的子节点列表发生变更
             *
             * 二、当KeeperState是Disconnected(0)时：
             * 1、EventType是None(-1)表示：客户端与ZooKeeper服务器断开连接
             *
             * 三、当KeeperState是Expired(-112)时：
             * 1、EventType是None(-1)表示：会话超时
             *
             * 四、当KeeperState是AuthFailed(4)时：
             * 1、EventType是None(-1)表示：1：使用错误的schema进行权限检查 2：SASL权限检查失败
             */

            // 这里只做了唤醒操作（唤醒在获取锁时被挂起的线程），因为这些事件基本都表明可以给后面线程获取锁的机会了
            notifyFromWatcher();
        }
    };

    /**
     * 构造
     *
     * @param client
     * @param basePath
     * @param thisLockName
     */
    ZkDistributedMutex(CuratorFramework client, String basePath, String thisLockName) {
        this.client = client;
        this.basePath = basePath;
        this.lockName = thisLockName + UNDER_LINE;
        this.lockPath = ZKPaths.makePath(basePath, thisLockName + UNDER_LINE);
        this.myID = Thread.currentThread().getName();
    }

    /**
     * 获取互斥锁，在没拿到锁之前，会重复挂起和唤醒，直到拿到锁
     *
     * @throws Exception
     */
    @Override
    public void acquire() throws Exception {
        if (!acquire(-1, null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }

    /**
     * 获取互斥锁
     * 时间单位为空时，没有拿到锁直接将当前线程挂起，等到唤醒后再次获取锁，没有获取到锁再次挂起，重复执行直到拿到锁
     * 时间单位不为空时，第一次没拿到锁时，会在等待指定时间后再尝试一次，再拿不到锁直接返回false
     *
     * @param time
     * @param unit
     * @return
     * @throws Exception
     */
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
        InnerLockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            // 锁重入：和synchronized的重入原理类似，都是给当前线程记录获取锁次数
            lockData.lockCount.incrementAndGet();
            return true;
        }
        // 获取锁路径
        String lockPath = attemptLock(time, unit);
        if (lockPath != null) {
            // 利用锁路径创建锁资源
            InnerLockData newLockData = new InnerLockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }

    /**
     * 尝试获取锁
     * 时间单位为空时，没有拿到锁直接将当前线程挂起，等到唤醒后再次获取锁，没有获取到锁再次挂起，重复执行直到拿到锁
     * 时间单位不为空时，第一次没拿到锁时，会在等待指定时间后再尝试一次，再拿不到锁直接返回false
     *
     * @param time 等待时间
     * @param unit 等时间单位
     * @return
     * @throws Exception
     */
    String attemptLock(long time, TimeUnit unit) throws Exception {
        final long startMillis = System.currentTimeMillis();
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        int retryCount = 0;
        boolean hasTheLock = false;
        boolean isDone = false;
        while (!isDone) {
            isDone = true;
            try {
                // 创建锁路径：临时顺序子节点（EPHEMERAL_SEQUENTIAL）
                selfPath = client.create().creatingParentsIfNeeded()//.withProtection()
                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(lockPath);
                // 内部循环拿锁
                hasTheLock = internalLockLoop(startMillis, millisToWait);
            } catch (KeeperException.NoNodeException e) {
                long elapsedTimeMs = System.currentTimeMillis() - startMillis;
                RetryPolicy retryPolicy = client.getZookeeperClient().getRetryPolicy();
                boolean allowRetry = retryPolicy.allowRetry(retryCount++, elapsedTimeMs, RetryLoop.getDefaultRetrySleeper());
                if (allowRetry) {
                    // 有异常后允许重试
                    isDone = false;
                } else {
                    // 有异常，不允许重试，直接抛出异常
                    throw e;
                }
            }
        }

        if (hasTheLock) {
            return selfPath;
        }

        return null;
    }

    /**
     * 内部循环直到拿到锁
     * millisToWait为空时，没有拿到锁直接将当前线程挂起，等到唤醒后再次获取锁，没有获取到锁再次挂起，重复执行直到拿到锁
     * millisToWait不为空时，第一次没拿到锁时，会在等待指定时间后再尝试一次，再拿不到锁直接返回false
     *
     * @param startMillis
     * @param millisToWait
     * @return
     * @throws Exception
     */
    private boolean internalLockLoop(long startMillis, Long millisToWait) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        try {
            // 循环条件是：zk客户端是start状态，且还没拿到锁
            while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
                // 检查是否是最小节点
                PredicateResults getLockResults = checkMinPath();
                if (getLockResults.getsTheLock()) {
                    // 是最小节点，说明拿到锁了
                    haveTheLock = true;
                } else {
                    // 未拿到锁，取出要监听的路径
                    String workerPath = getLockResults.getPathToWatch();
                    synchronized (this) {
                        try {
                            // 添加监听器， 用 getData()替代exists()以避免泄露watchers资源
                            byte[] content = client.getData().usingWatcher(watcher).forPath(workerPath);
                            if (millisToWait != null) {
                                // 等待时间要减去前面执行代码的一段时间
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait <= 0) {
                                    // 过期, 删除当前锁节点
                                    doDelete = true;
                                    break;
                                }
                                // 挂起当前线程，在指定时间后自动唤醒再次尝试获取锁
                                wait(millisToWait);
                            } else {
                                // 在getLock时没有传入时间单位就挂起当前线程（wait后会释放当前线程持有的锁资源）
                                // 在监听器回调时唤醒
                                wait();
                            }
                        } catch (KeeperException.NoNodeException e) {
                            // it has been deleted (i.e. lock released).
                            // 异常不处理，重新获取锁
                        }
                    }
                }
            }
        } catch (Exception e) {
            doDelete = true;
            throw e;
        } finally {
            if (doDelete) {
                releaseLock(selfPath);
            }
        }
        return haveTheLock;
    }

    /**
     * 检查自己是不是最小的节点
     * zk是以路径节点作为锁的，如果是最小节点则说明拿到锁了
     *
     * @return
     * @throws Exception
     */
    public PredicateResults checkMinPath() throws Exception {
        String waitPath = null;

        List<String> children = client.getChildren().forPath(basePath);

        Lists.newArrayList(children);
        List<String> subNodes = Lists.newArrayList();
        int indx = 0;
        for (String item : children) {
            indx = item.lastIndexOf(UNDER_LINE);
            // 过滤出lockName开头的节点, 其他lock也在同一basePath目录下, 不需要参与排序和计算
            if (indx > 0 && StringUtils.equals(item.substring(0, indx + 1), lockName)) {
                subNodes.add(item);
            }
        }
        // 将各节点排序，下标为0的是最小节点
        Collections.sort(subNodes);

        int lockIndex = subNodes.indexOf(selfPath.substring(basePath.length() + 1));

        switch (lockIndex) {
            case -1: {
                // 未找到该节点
                throw new KeeperException.NoNodeException("Sequential path not found: " + selfPath);
            }
            case 0: {
                // 是最小节点，拿到锁
                return new PredicateResults(null, true);
            }
            default: {
                waitPath = basePath + "/" + subNodes.get(lockIndex - 1);
                // 没拿到锁，返回的是当前节点前面还在等待的节点
                return new PredicateResults(waitPath, false);
            }

        }

    }

    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName) throws Exception {
        int ourIndex = children.indexOf(sequenceNodeName);
        if (ourIndex < 0) {
            throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);
        }

        boolean getsTheLock = ourIndex < 1;
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - 1);

        return new PredicateResults(pathToWatch, getsTheLock);
    }

    @Override
    public boolean isAcquiredInThisProcess() {
        return (threadData.size() > 0);
    }


    @Override
    public void release() throws Exception {
        Thread currentThread = Thread.currentThread();
        InnerLockData lockData = threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        // 释放锁时减1：重入锁释放时都是这个原理
        int newLockCount = lockData.lockCount.decrementAndGet();
        // 大于0，说明当前线程中还有其他线程没有释放该锁
        if (newLockCount > 0) {
            return;
        } else if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
            // 为0时，可以释放锁资源了
            releaseLock(lockData.lockPath);
        } finally {
            threadData.remove(currentThread);
        }
    }

    /**
     * zookeeper释放锁
     * @param lockPath 锁节点
     * @throws Exception
     */
    private void releaseLock(String lockPath) throws Exception {
        try {
            client.delete().guaranteed().forPath(lockPath);
        } catch (KeeperException.NoNodeException e) {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }

    /**
     * 通过监听器唤醒挂起的线程
     */
    private synchronized void notifyFromWatcher() {
        // 唤醒所有由于第一次没有获取到锁而被挂起的线程
        // 唤醒后再次抢锁
        notifyAll();
    }


}
